last edits:
This notebook creates numbers and figures about the statistical significance of the presence of trends.
In [1]:
# --- run required cells automatically? :) (notebook must be stored for this!)
import mutils.io as mio
import mutils.misc as mi
import mutils.FDatAn as fda
# load basic config
%cd /home/moritz/mmnotebooks
nb_name = 'AnkleSLIP - find minimal model- Version 3.ipynb'
mi.run_nbcells(nb_name, ['0'])
conf.subject = 3
conf.quiet = True
conf.startup_compute_PCA = False
mi.run_nbcells(nb_name, ['1']) # load data
#['0.1','1', ]
print "\nnotebook initialized"
In [8]:
# prepare data
dat_r = ws1.dataset_full.all_kin_r
dat_r -= mean(dat_r, axis=0)
dat_l = ws1.dataset_full.all_kin_l
dat_l -= mean(dat_l, axis=0)
In [2]:
# use mdp toolkit
if False: # re-implemented manually
import mdp
# other possible methods:
#SF = mdp.nodes.SFANode() # slow feature analysis
#IC = mdp.nodes.FastICANode() # independent component analysis
ISFR = mdp.nodes.SFANode()
ISFL = mdp.nodes.SFANode()
# perform ISFA and train nodes
isfa_r = ISFR(dat_r)
isfa_l = ISFL(dat_l)
In [3]:
# SFA finally yields a linear transformation of the data, as you can see here:
# -> fit the (zero-mean) input and output data:
# A = dot(isfa_r.T, pinv(dat_r.T))
# -> compare to transformation matrix of SFA node: (called "sf")
# allclose(A.T, ISFR.sf)
# -> new data are orthonormal (up to constant scale):
#sfd = ISFR(dat_r)
#allclose(dot(sfd.T, sfd)/ (sfd.shape[0]-1), eye(sfd.shape[1]) ) # -> covariance is proportional to unity matrix!
In [47]:
# manually implement SFA
# compute "whitened" data v using PCA
def SFA(dat, nskip=0):
"""
implements slow feature analysis
:args:
dat (n-by-p array): matrix with n observations of p dimensions;
MUST BE ZERO-MEAN
nskip (int): remove the <nskip> slowest axes from the data
:returns:
sdat (n-by-p array): re-ordered data (ordered by slowness)
smat (p-by-p array): corresponding scaling matrix: sdat = dat * smat.T
lpmat (p-by-p array): to discard the slowest <nskip> dimensions, use: ldat = dat * lpmat.T
"""
u, s, v = svd(dat.T, full_matrices=False)
# compute time derivative of whitened data
vprime = vstack([diff(v[row, :]) for row in range(v.shape[0])])
# compute "fastest" and "slowest" axes, using PCA (axes with maximal / minimal magnitude)
up, sp, vp = svd(vprime, full_matrices=False)
# compute matrix that projects original data onto these axes
cmat = reduce(dot,[up[:,::-1].T, diag(1./s), u.T])
# the result (here: sdat) is now ordered by slowness, the first columns representing the slowest parts
sdatr = dot(cmat, dat.T).T
# construct corresponding detrending filter:
anmat = eye(dat.shape[1]) # annulation matrix for first n rows
if nskip > 0:
anmat[:nskip, :nskip] = 0
lpmat = reduce(dot,[u, diag(s),up[:,::-1] ,anmat, up[:,::-1].T, diag(1./s), u.T])
return dot(dat, cmat.T), cmat, lpmat
In [48]:
lpdat_r, cmat_r, lpmat_r = SFA(dat_r, 7)
lpdat_l, cmat_l, lpmat_l = SFA(dat_l, 7)
# detrend data, again, to show how to use the code
lpdat = dot(lpmat_r, dat_r.T).T # "lowpassed" data
trend = dat_r - lpdat
# visualize
vdim = 0 # com z
import scipy.signal as sig
figure(figsize=(12,6))
plot(lpdat[:, vdim] + .025,'r+', label='detrended data')
plot(sig.medfilt(lpdat[:, vdim], 21) + .025,'r-',lw=3)
plot(dat_r[:, vdim],'g+', label='original data')
plot(sig.medfilt(dat_r[:, vdim], 21),'g-', lw=3)
plot(trend[:, vdim] + .05,'b+', label='trend')
grid('on')
legend()
xlabel('stride #')
Out[48]:
In [12]:
figure()
rvar = var(trend, axis=0)/var(dat_r, axis=0)
plot(rvar,'d')
grid('on')
xlabel('coordinate #')
title('variance of trend / total variance' )
assert(len(ws1.dataset_full.kin_labels) == dat_r.shape[1])
print "coordinates with high trend:"
for dim in range(dat_r.shape[1]):
if rvar[dim] > .4:
print ws1.dataset_full.kin_labels[dim], ": ", rvar[dim]
In [14]:
# visualize "slowness components"
figure(figsize=(10,10))
for dim in range(40):
plot(lpdat_r[:, dim] / (5*std(lpdat_r[:, dim])) + dim,',' )
ylim(-1, 41)
xlabel('stride #')
ylabel('score on coordinate #, ordered by slowness')
pass
In [41]:
#import scipy.io as sio
# sio.savemat('sfa_demo.mat', {'dat1' : ws1.dataset_full.all_kin_r, 'dat2' :ws1.dataset_full.all_kin_l }, do_compression=True)
In [49]:
# eigenvalues of new data's return maps
_, _, lpmat_r = SFA(dat_r, 10)
_, _, lpmat_l = SFA(dat_l, 10)
odat_r = dot(dat_r, lpmat_r.T)
odat_l = dot(dat_l, lpmat_l.T)
_, maps, _ = fda.fitData(odat_r[:-1,:],odat_r[1:,:], nrep=50, rcond=1e-8)
_, m1, _ = fda.fitData(odat_r, odat_l, nrep=50, rcond=1e-8)
_, m2, _ = fda.fitData(odat_l[:-1, :], odat_r[1:, :], nrep=50, rcond=1e-8)
In [50]:
cmaps = [dot(m2_, m1_) for m1_, m2_ in zip(m1, m2)]
vi1 = ut.VisEig(127)
vi2 = ut.VisEig(127)
for A in maps:
vi1.add(eig(A)[0])
for A in cmaps:
vi2.add(eig(A)[0])
# radius of "pure iid white noise data"
r_noise = sqrt(odat_l.shape[1]/((1-exp(-1))*odat_l.shape[0]))
tn = linspace(0,2*pi,50)
figure(figsize=(10,5))
subplot(1,2,1)
vi1.vis1()
plot(r_noise*sin(tn), r_noise*cos(tn),'r',lw=2, label='est. noise floor')
legend()
xlim(-1,1)
ylim(-1,1)
title('single section map')
subplot(1,2,2)
vi2.vis1()
title('two-section based map')
xlim(-1,1)
ylim(-1,1)
Out[50]:
In [35]:
import mutils.FDatAn as fda
In [36]:
# compare predictive power of full and detrended data
ldat_r = fda.dt_movingavg(dat_r, 25)
ldat_l = fda.dt_movingavg(dat_l, 25)
v1r = st.predTest(dat_r, dat_l,)
v2r = st.predTest(odat_r, odat_l)
v3r = st.predTest(ldat_r, ldat_l)
v1l = st.predTest(dat_l[:-1,:], dat_r[1:,:])
v2l = st.predTest(odat_l[:-1,:], odat_r[1:,:])
v3l = st.predTest(ldat_l[:-1,:], ldat_r[1:,:])
v1st = st.predTest(dat_r[:-1,:], dat_r[1:,:])
v2st = st.predTest(odat_r[:-1,:], odat_r[1:,:])
In [39]:
figure(figsize=(12,5))
subplot(1,3,1)
plot(mean(v1r, axis=0),'kd', label='original')
plot(mean(v2r, axis=0),'rd', label='detrended (new)')
plot(mean(v3r, axis=0),'gd', label='detrended (old)')
title('predicting left apex')
legend(loc='best')
ylabel('rel. rem. var.')
ylim(0,1.3)
plot([0, v1r.shape[1]],[1, 1],'k--')
subplot(1,3,2)
plot(mean(v1l, axis=0),'kd', label='original data')
plot(mean(v2l, axis=0),'rd', label='detrended data (new)')
plot(mean(v3l, axis=0),'gd', label='detrended data (old)')
title('predicting right apex')
plot([0, v1l.shape[1]],[1, 1],'k--')
ylim(0,1.3)
subplot(1,3,3)
plot(mean(v1st, axis=0),'kd', label='original data')
plot(mean(v2st, axis=0),'rd', label='detrended data (new)')
title('predicting 1 stride')
plot([0, v1st.shape[1]],[1, 1],'k--')
ylim(0,1.3)
pass
In [ ]:
# compute PCAs
if True:
d1d = ws1.k.make_1D(nps=50,)[:, 100:]
d1m = mean(d1d, axis=0)
d1d -= d1m
u,s,v = svd(d1d, full_matrices=False)
In [ ]:
# visualize
figure(figsize=(8,6))
for pc in range(5):
subplot(5,1,pc+1)
if pc==0:
title('scores on the principal components of the motion')
plot(u.T[pc, :] / std(u.T[pc, :]),'k.', markersize=2)
grid('on')
ylabel('pc #%i' % (pc+1))
gca().set_yticks([-2,0,2])
if pc<4:
gca().set_xticklabels([])
else:
xlabel('stride number')
subplots_adjust(hspace=.1)
savefig('img/pc_s{}.png'.format(conf.subject), dpi=300)
savefig('img/pc_s{}.pdf'.format(conf.subject))
In [ ]:
p1 = u.T[0,:]
p2 = u.T[1,:]
p3 = u.T[2,:]
dt1 = fda.dt_movingavg(p1, conf.dt_window, conf.dt_medfilter)
dt2 = fda.dt_movingavg(p2, conf.dt_window, conf.dt_medfilter)
dt3 = fda.dt_movingavg(p3, conf.dt_window, conf.dt_medfilter)
print "ratio of std(trend) / std(local variance)"
print std(p1 - dt1) / std(dt1)
print std(p2 - dt2) / std(dt2)
print std(p3 - dt3) / std(dt3)
In [ ]:
# make ar-similar process in cython (10-times faster than python)
%load_ext cythonmagic
In [ ]:
%%cython -lm
import numpy as np
cimport numpy as np
from libc.math cimport sqrt # much, much faster than np.sqrt
def cy_ar_similar(np.ndarray[np.float_t, ndim=1] data):
cdef np.ndarray[np.float_t, ndim=1] dat
# cdef double uv # unused variable
# out = np.zeros(data.shape[0],dtype=np.float64)
# for idx in range(data.shape[0]):
# out[idx] = k*(sqrt(data[idx,0]**2 + data[idx,1]**2) - l0)
# return out
dat = data - np.mean(data)
alpha = np.dot(dat[1:], dat[:-1]) / sqrt(np.dot(dat[1:], dat[1:]) * np.dot(dat[:-1],
dat[:-1]))
#res = ar_process(len(dat), alpha).squeeze()
res = np.zeros_like(dat)
xi = np.random.randn(len(dat))
for x in range(len(dat)):
res[x] = res[x-1] * alpha + xi[x]
#return res
res = res - np.mean(res)
return res / np.std(res) * np.std(dat) + np.mean(data)
In [ ]:
dat = randn(1000)
d1 = fda.dt_movingavg(dat, 15, False)
d2 = fda.dt_movingavg(dat[:, newaxis], 15, False).squeeze()
print "equal:", allclose(d1, d2)
d1.shape
In [ ]:
def trend(dat):
tr = dat - fda.dt_movingavg(dat, conf.dt_window, conf.dt_medfilter)
return std(tr)
ninner = 10000
print "subject", conf.subject
print "right step params:"
for dim in range(5):
dat = ws1.dataset_full.all_param_r[:,dim]
t0 = trend(dat)
all_t = [trend(dat[permutation(len(dat))]) for rep in range(ninner)]
all_ts = [trend(cy_ar_similar(dat)) for rep in range(ninner)]
print "param", dim, ", randomized: delta in stds:", (t0 - mean(all_t)) / std(all_t),
qtr = array(all_t)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "param", dim, ", ar_similar: delta in stds:", (t0 - mean(all_ts)) / std(all_ts),
qtr = array(all_ts)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "left step params:"
for dim in range(5):
dat = ws1.dataset_full.all_param_l[:,dim]
t0 = trend(dat)
all_t = [trend(dat[permutation(len(dat))]) for rep in range(ninner)]
all_ts = [trend(cy_ar_similar(dat)) for rep in range(ninner)]
print "param", dim, ", randomized: delta in stds:", (t0 - mean(all_t)) / std(all_t),
qtr = array(all_t)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "param", dim, ", ar_similar: delta in stds:", (t0 - mean(all_ts)) / std(all_ts),
qtr = array(all_ts)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "right IC:"
for dim in range(3):
dat = ws1.dataset_full.all_IC_r[:,dim]
t0 = trend(dat)
all_t = [trend(dat[permutation(len(dat))]) for rep in range(ninner)]
all_ts = [trend(cy_ar_similar(dat)) for rep in range(ninner)]
print "param", dim, ", randomized: delta in stds:", (t0 - mean(all_t)) / std(all_t),
qtr = array(all_t)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "param", dim, ", ar_similar: delta in stds:", (t0 - mean(all_ts)) / std(all_ts),
qtr = array(all_ts)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "left IC:"
for dim in range(3):
dat = ws1.dataset_full.all_IC_l[:,dim]
t0 = trend(dat)
all_t = [trend(dat[permutation(len(dat))]) for rep in range(ninner)]
all_ts = [trend(cy_ar_similar(dat)) for rep in range(ninner)]
print "param", dim, ", randomized: delta in stds:", (t0 - mean(all_t)) / std(all_t),
qtr = array(all_t)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
print "param", dim, ", ar_similar: delta in stds:", (t0 - mean(all_ts)) / std(all_ts),
qtr = array(all_ts)
qtr.sort()
print " in quantiles:", (argmin(abs(qtr - t0)) + 1) / float(ninner)
In [ ]:
# build surrogate data
dat = ws1.dataset_full.all_param_r[:, 0]
def gettrend(dat):
return dat - fda.dt_movingavg(dat, conf.dt_window, conf.dt_medfilter)
vc_d = [cy_ar_similar(dat) for x in range(100)]
vc = [gettrend(x) for x in vc_d]
vc = vstack(vc).T
vc_d = vstack(vc_d).T
In [ ]:
figure(figsize=(6,4))
ax1 = subplot(2,1,2)
for rep in range(vc.shape[1]):
plot(vc[:,rep],'b,', lw=1, markersize=1 )# alpha=.1) #25)
plot(vc[:2,0], 'b.', markersize=2, label='surrogate data')
plot(gettrend(dat), 'r.', markersize=3, label='experimental data')
ax1.set_yticks([170, 180,]) # 170])
ax1.set_ylim(163, 187)
ax1.set_title('trend of leg stiffness')
ax2 = subplot(2,1,1)
for rep in range(vc_d.shape[1]):
plot(vc_d[:,rep],'b,', lw=1, alpha=.5) #25)
plot(vc_d[:2,0], 'b.', markersize=2, label='surrogate data')
plot(dat, 'r.', markersize=3, label='experimental data')
ax2.set_yticks([125, 150, 175, 200,225])
ax2.set_ylim(115, 225)
import matplotlib as mpl
mpl.rcParams['legend.fontsize'] = 9
import matplotlib.font_manager as fmt
FM = fmt.FontManager()
fnt = FM.findfont('Arial')
fp = fmt.FontProperties(fname=fnt)
fp.set_size(9.0)
leg = legend()
for label in leg.get_texts():
label.set_fontproperties(fp)
ylabel('relative leg stiffness [Nm/kg]')
ax2.get_yaxis().set_label_coords(-0.08,0)
title('leg stiffness for individual right steps')
#subplots_adjust(hspace=.1)
ax1.set_xlabel('stride number')
ax2.set_xticklabels([])
savefig('img/trends_example_s{}.png'.format(conf.subject), dpi=300)
savefig('img/trends_example_s{}.pdf'.format(conf.subject))
In [ ]:
dat.shape
In [ ]:
print "magnitude of trend vs. locals"
print std(gettrend(dat)) / std(fda.dt_movingavg(dat, conf.dt_window, conf.dt_medfilter))
print "magnitude of trend vs. locals (one surrogate)"
rat = array([std(gettrend(vc_d[:, dim])) / std(fda.dt_movingavg(vc_d[:, dim], conf.dt_window, conf.dt_medfilter))
for dim in range(vc_d.shape[1])])
print mean(rat), "+-", std(rat)
In [ ]:
vc_d.shape
In [ ]:
# configuration
c = mio.saveable()
c.useLast = True
c.useMedian = False
#c.dt_win = 30
c.dt_median = False
c.dt = False
c.vis_all = False
# compute variance as function of window length (function definition)
def wvar(data, wlen, useLast=False, useMedian=False):
s = data.shape[0]
n = 0
v = []
pos = 0
while pos < s - 1:
if (pos + wlen > s) and not useLast:
break
v.append(var(data[pos:pos+wlen,...], axis=0, ddof=1))
pos += wlen
if useMedian:
return median(vstack(v), axis=0)
return mean(vstack(v), axis=0)
In [ ]:
# load and preprocess data
kar = ws1.k.get_kin_apex(ws1.dataset_full.all_phases_r)
kal = ws1.k.get_kin_apex(ws1.dataset_full.all_phases_l)
kar = hstack(kar)[2:,:].T
kal = hstack(kal)[2:,:].T
kar -= mean(kar, axis=0)
kal -= mean(kal, axis=0)
In [ ]:
dt_lengths = [0, 10, 20, 30, 40, 50] #0: no detrending
nwin = 20 # different window lenghts
nboot = 50 # iterations for permutations
all_nwv2 = []
all_wv1 = []
for dt_len in dt_lengths:
if dt_len > 0:
print "dt_len", 2*dt_len + 1
kal_dt = fda.dt_movingavg(kal, dt_len, c.dt_median)
kar_dt = fda.dt_movingavg(kar, dt_len, c.dt_median)
else:
print "no detrending"
kal_dt = kal
kar_dt = kar
# regression:
Al = dot(pinv(kal_dt[:-1,:], rcond=1e-12), kar_dt[1:,:]).T
Ar = dot(pinv(kar_dt, rcond=1e-12), kal_dt).T
# predictions
pred_r = dot(Al, kal_dt[:-1,:].T).T
pred_l = dot(Ar, kar_dt.T).T
resid_r = kar_dt[1:,:] - pred_r
resid_l = kal_dt - pred_l
wlens = logspace(.7,3,nwin).astype(int)
# compute for original data
wv1 = vstack([wvar(resid_l, wl, c.useLast, c.useMedian) for wl in wlens])
# compute for shuffled residuals
nwv2 = []
import sys
print "=" * nboot
for rep in range(nboot):
resid_l_p = resid_l[permutation(resid_l.shape[0]),:]
nwv2.append(vstack([wvar(resid_l_p, wl, c.useLast, c.useMedian) for wl in wlens]))
sys.stdout.write('.')
print "\ndone"
nwv2=array(nwv2)
all_nwv2.append(nwv2)
all_wv1.append(wv1)
In [ ]:
# visualize
import mutils.plotting as muplt
cols = muplt.colorset_distinct
for dim in [0, 2, 6, 15, 20]:
scale = mean(all_nwv2[0][:,:,dim])
figure(figsize=(14,8))
for nr in range(len(all_nwv2)):
wv1 = all_wv1[nr]
nwv2 = all_nwv2[nr]
subplot(len(all_nwv2), 1, nr+1)
lbl1 = "None" if dt_lengths[nr] < 1 else "{} frames".format(dt_lengths[nr] * 2 + 1)
semilogx(wlens,wv1[:,dim]/scale, color=cols[nr], marker='d', ls='None')
#semilogx(wlens,wv2[:,dim]/scale,'b.', label='shuffled residuals')
semilogx(wlens, mean(nwv2[:,:,dim], axis=0)/scale, color=cols[nr], label=lbl1)
semilogx(wlens, (mean(nwv2[:,:,dim], axis=0) + std(nwv2[:,:,dim], axis=0))/scale, color=cols[nr], ls='--' )
semilogx(wlens, (mean(nwv2[:,:,dim], axis=0) - std(nwv2[:,:,dim], axis=0))/scale, color=cols[nr], ls='--')
semilogx([2,1500], [1,1],'k--')
if nr == 0:
if dim == 0:
co_str = 'CoM height'
elif dim == 2:
co_str = 'lat. pos. of right ankle'
elif dim == 6:
co_str = 'lat. pos. of left ankle'
elif dim == 15:
co_str = 'horiz. pos. of right ankle'
elif dim == 20:
co_str = 'horiz. pos. of left ankle'
title(''.join(['subject {}: '.format(conf.subject),
'variance of {} as function of window length for different detrendings'.format(co_str),
' (lines: shuffled residuals)']))
if nr == len(all_nwv2) - 1:
xlabel('length of window for each variance computation')
else:
gca().set_xticklabels([])
if nr==2:
ylabel('normalized variance')
legend()
ylim(.80, 1.1)
yticks([.85,.9,.95,1,1.05])
xlim(3,1200)
#grid('on')
savefig('img/dt_s{}d{}.pdf'.format(conf.subject, dim))
In [ ]:
figure()
plot(resid_l[:,0],'+')
In [ ]:
resid_l.shape
In [ ]:
resid_r_p = resid_r[permutation(resid_r.shape[0]),:]
wv1r = [wvar(resid_r, wl) for wl in range(5,500)]
wv2r = [wvar(resid_r_p, wl) for wl in range(5,500)]
In [ ]:
figure(figsize=(14,4))
plot(wv1r / mean(wv2r),'r',wv2r/mean(wv2r),'b')
In [ ]:
var(resid_r[1000:1020,...])
In [ ]:
var(randn(1000), )
In [ ]: